c22e9098934d2fdf4205aae1e0f54eedc68bac33,jdbc-lib/src/test/java/com/streamsets/pipeline/stage/origin/jdbc/table/BasicIT.java,BasicIT,testSingleTableMultipleBatches,#,245
Before Change
.tableConfigBeans(ImmutableList.of(tableConfigBean))
.build();
SourceRunner runner = new SourceRunner.Builder(TableJdbcDSource.class, tableJdbcSource)
.addOutputLane("a").build();
runner.runInit();
try {
StageRunner.Output output = runner.runProduce("", 5);
List<Record> records = output.getRecords().get("a");
Assert.assertEquals(5, records.size());
checkRecords(EXPECTED_CRICKET_STARS_RECORDS.subList(0, 5), records);
output = runner.runProduce(output.getNewOffset(), 5);
records = output.getRecords().get("a");
Assert.assertEquals(5, records.size());
checkRecords(EXPECTED_CRICKET_STARS_RECORDS.subList(5, 10), records);
After Change
.tableConfigBeans(ImmutableList.of(tableConfigBean))
.build();
PushSourceRunner runner = new PushSourceRunner.Builder(TableJdbcDSource.class, tableJdbcSource)
.addOutputLane("a")
.setOnRecordError(OnRecordError.TO_ERROR)
.build();
runner.runInit();
try {
JdbcPushSourceTestCallback callback = new JdbcPushSourceTestCallback(runner, 2);
runner.runProduce(Collections.emptyMap(), 5, callback);
List<List<Record>> batchRecords = callback.waitForAllBatchesAndReset();
List<Record> records = batchRecords.get(0);
Assert.assertEquals(5, records.size());
checkRecords(EXPECTED_CRICKET_STARS_RECORDS.subList(0, 5), records);